注意本文采用最新版本进行Kafka的内核原理剖析,新版本每一个Consumer通过独立的线程,来管理多个Socket连接,即同时与多个broker通信实现消息的并行读取。这就是新版的技术革新。类似于Linux I/O模型或者Select NIO 模型。

Poll为什么要设置一个超时参数

  • 条件:
  • 1:获取足够多的可用数据
  • 2:等待时间超过指定的超时时间。
  • 目的在于让Consumer主线程定期的””苏醒”去做其他事情。比如:定期的执行常规任务,(比如写日志,写库等)。
  • 获取消息,然后执行业务逻辑。

位移精度

  • 最少一次 -> 消息会被重复处理
  • 最多一次 -> 消息会丢失,但不会被重复处理。
  • 精确一次 -> 一定会被处理,且也只会处理一次。

位移角色

  • 上次提交位移 :last committed offset
  • 当前位置 :current position
  • 水位 : High watermark
  • 日志终端位移: (Log End Offset)

位移管理

consumer的位移提交最终会向group coordinator来提交,不过这里重点需要重新说明一下:组协调者coordinator负责管理所有的Consumer实例。而且coordinator运行在broker上(通过选举出某个broker),不过请注意新版本coordinator只负责做组管理。

但是具体的reblance分区分配策略目前已经交由Consumer客户端。这样就解耦了组管理和分区分配。

权利下放的优势:

  • 如果需要分配就貌似需要重启整个kafka集群。
  • 在Consumer端可以定制分区分配策略。
  • 每一个consumer位移提交时,都会向_consumer_offsets对应的分区上追加写入一条消息。如果某一个consumer为同一个group的同一个topic同一个分区提交多次位移,很显然我们只关心最新一次提交的位移。

reblance的触发条件

  • 组订阅发生变更,比如基于正则表达式订阅,当匹配到新的topic创建时,组的订阅就会发生变更。
  • 组的topic分区数发生变更,通过命令行脚本增加了订阅topic的分区数。
  • 组成员发生变更:新加入组以及离开组。

reblance 分配策略

range分区分配策略

举例如下:一个拥有十个分区(0,1,2…..,9)的topic,相同group拥有三个consumerid为a,b,c的消费者:

  • consumer a分配对应的分区号为[0,4),即0,1,2,3前面四个分区

  • consumer b 分配对应分区4,5,6中间三个分区

  • consumer c 分配对应分区7,8,9最后三个分区。

    class RangeAssignor() extends PartitionAssignor with Logging {

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    def assign(ctx: AssignmentContext) = {
    val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
    val partitionAssignment =
    new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
    for (topic <- ctx.myTopicThreadIds.keySet) {
    val curConsumers = ctx.consumersForTopic(topic)
    val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)

    val nPartsPerConsumer = curPartitions.size / curConsumers.size
    val nConsumersWithExtraPart = curPartitions.size % curConsumers.size

    info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +
    " for topic " + topic + " with consumers: " + curConsumers)

    for (consumerThreadId <- curConsumers) {
    val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
    assert(myConsumerPosition >= 0)
    val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
    val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)

    /**
    * Range-partition the sorted partitions to consumers for better locality.
    * The first few consumers pick up an extra partition, if any.
    */
    if (nParts <= 0)
    warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
    else {
    for (i <- startPart until startPart + nParts) {
    val partition = curPartitions(i)
    info(consumerThreadId + " attempting to claim partition " + partition)
    // record the partition ownership decision
    val assignmentForConsumer = partitionAssignment.getAndMaybePut(consumerThreadId.consumer)
    assignmentForConsumer += (TopicAndPartition(topic, partition) -> consumerThreadId)
    }
    }
    }
    }

    源码剖析如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    curConsumers=(a,b,c)
    curPartitions=(0,1,2,3,4,5,6,7,8,9)
    nPartsPerConsumer=10/3 =3
    nConsumersWithExtraPart=10%3 =1

    a:
    myConsumerPosition= curConsumers.indexof(a) =0
    startPart= 3*0+0.min(1) = 0
    nParts = 3+(if (0 + 1 > 1) 0 else 1)=3+1=4
    b:
    myConsumerPosition=1
    c:
    myConsumerPosition

round-robin分区分配策略

如果同一个消费组内所有的消费者的订阅信息都是相同的,那么RoundRobinAssignor策略的分区分配会是均匀的。举例如下:假设消费组中有2个消费者C0和C1,都订阅了主题topic0 和 topic1,并且每个主题都有3个分区,进行hashCode 排序 后,顺序为:topic0_0、topic0_1、topic0_2、topic1_0、topic1_1、topic1_2。最终的分配结果为:

消费者consumer0:topic0_0、topic0_2 、 topic1_1

消费者consumer1:topic0_1、topic1_0、 topic1_2

使用RoundRobin策略有两个前提条件必须满足:

  • 同一个Consumer Group里面的所有消费者的num.streams必须相等;
  • 每个消费者订阅的主题必须相同。

所以这里假设前面提到的2个消费者的num.streams = 2。RoundRobin策略的工作原理:将所有主题的分区组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode 进行排序,最后按照round-robin风格将分区分别分配给不同的消费者线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) =>
info("Consumer %s rebalancing the following partitions for topic %s: %s"
.format(ctx.consumerId, topic, partitions))
partitions.map(partition => {
TopicAndPartition(topic, partition)
})
}.toSeq.sortWith((topicPartition1, topicPartition2) => {
/*
* Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending
* up on one consumer (if it has a high enough stream count).
*/
topicPartition1.toString.hashCode < topicPartition2.toString.hashCode
})

StickyAssignor分区分配策略(摘录)

  • 分区的分配要尽可能的均匀;
  • 分区的分配尽可能的与上次分配的保持相同。当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目标,StickyAssignor策略的具体实现要比RangeAssignor和RoundRobinAssignor这两种分配策略要复杂很多。

假设消费组内有3个消费者:C0、C1和C2,它们都订阅了4个主题:t0、t1、t2、t3,并且每个主题有2个分区,也就是说整个消费组订阅了t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1这8个分区。最终的分配结果如下:

1
2
3
消费者C0:t0p0、t1p1、t3p0
消费者C1:t0p1、t2p0、t3p1
消费者C2:t1p0、t2p1

假设此时消费者C1脱离了消费组,那么消费组就会执行再平衡操作,进而消费分区会重新分配。如果采用RoundRobinAssignor策略,那么此时的分配结果如下:

1
2
消费者C0:t0p0、t1p0、t2p0、t3p0
消费者C2:t0p1、t1p1、t2p1、t3p1

RoundRobinAssignor策略会按照消费者C0和C2进行重新轮询分配。而如果此时使用的是StickyAssignor策略,那么分配结果为:

1
2
消费者C0:t0p0、t1p1、t3p0、t2p0
消费者C2:t1p0、t2p1、t0p1、t3p1

可以看到分配结果中保留了上一次分配中对于消费者C0和C2的所有分配结果,并将原来消费者C1的“负担”分配给了剩余的两个消费者C0和C2,最终C0和C2的分配仍然保持了均衡。

如果发生分区重分配,那么对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,这显然很浪费系统资源。StickyAssignor策略如同其名称中的“sticky”一样,让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗以及其它异常情况的发生。

reblance generation (代代不同)

主要作用在于防止无效的offset提交,原因在于若上一届的consumer成员因为某些原因延迟提交了offset,同时被踢出group组,那么新一届的group组成员分区分配结束后,老一届的consumer再次提交老的offset就会出问题。因此采用reblance generation ,老的请求就会被拒绝。

reblance 扫尾工作

每一次reblance操作之前,都会检查用户是否设置了自动提交位移,如果是,则帮助用户提交。如没有设置,会在监听器中回调用户的提交程序。